Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_http: Add option to reuse connections #4330

Merged
merged 7 commits into from
Apr 30, 2024

Conversation

Garfield96
Copy link
Contributor

@Garfield96 Garfield96 commented Oct 22, 2023

Which issue(s) this PR fixes:
N/A

What this PR does / why we need it:
The HTTP output plugin recreates a connection for every HTTP request. In combination with TLS and high network latencies, this can result in very poor performance. This PR adds an option to reuse connections.

The implementation caches one connection per flush thread. The cache is part of the HTTP plugin instance and is implemented as an array. When a thread creates its first request, it gets assigned a slot (id) in this array and the id is stored in a thread local variable. Since this slot is exclusively used by a single thread, no synchronization is required. If the endpoint changes at runtime, the old connection will be closed and replaced by a new connection to the new endpoint. During shutdown, all open connections are finished in the close method.

This implementation works very well for a static endpoint. If the endpoint changes frequently, there will be little benefit. However, it can be assumed that most users use a static endpoint.

Benchmark
I tested the throughput with two Fluentd instances on a single machine. One instance acting as sender and the other as receiver. Even though the connection creation was very fast, because both instances were located on the same machine and TLS was not enabled, activating connection reuse doubled the throughput.

Sender configuration:

<source>
  @type sample
  sample {"hello":"world"}
  tag sample
  rate 5000
</source>

<match **>
  @type http

  endpoint http://localhost:9000
  open_timeout 2
  reuse_connections true
  <format>
    @type json
  </format>
  <buffer>
    flush_interval 10s
    chunk_limit_records 1
    flush_thread_count 32
  </buffer>
</match>

Receiver configuration:

<system>
  workers 1
</system>

<source>
  @type http
  port 9000
  bind 0.0.0.0
  body_size_limit 32m
  keepalive_timeout 10s
</source>

<match **>
  @type flowcounter_simple
  unit second
</match>

Docs Changes:
fluent/fluentd-docs-gitbook#501

Release Note:
out_http: Add option to reuse connections

@daipom daipom added the enhancement Feature request or improve operations label Oct 30, 2023
@daipom daipom added this to the v1.17.0 milestone Oct 30, 2023
@Garfield96
Copy link
Contributor Author

Hi @daipom,
are you fine with this change? If no bigger changes are required, I'll add some tests.

@Garfield96 Garfield96 force-pushed the out-http-reuse-connections branch from b00f90e to 7ba811d Compare November 6, 2023 12:17
@Garfield96
Copy link
Contributor Author

Hi @daipom @ashie ,
I added a test case which shows that the feature works for the general case as well as for interrupted connections. Do you have an estimate when this feature can be released?

@daipom
Copy link
Contributor

daipom commented Nov 18, 2023

Thanks for this enhancement.
Sorry for my late response.
I was busy releasing v1.16.3 😢, but it was finally released on 11/14!

Now, I'm working on releasing fluent-package v5.0.2, which contains Fluentd v1.16.3.
It will be released at the end of this month.

Since this adds the new option, it will be released on v1.17.0 (fluent-package v5.1.0).
That date is not yet fixed, I think, but probably be early next year.
I'm sorry for not seeing the details yet, but I will see this for v1.17.0 this month or next month.

@Garfield96 Garfield96 requested a review from ashie December 17, 2023 21:27
@Garfield96
Copy link
Contributor Author

@ashie @daipom Can you please review this PR as well as #4331? Thanks

@daipom
Copy link
Contributor

daipom commented Jan 15, 2024

Sorry for my late response.
I'm seeing this.

It does not affect the existing specifications.
It looks basically good.

However, there are a few points that concern me. I will comment on them later.

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my late response and the overall change request.

It looks good, but I believe we can use more simple logic.
Would you please consider if it is possible to use this kind of simple logic?

It is becoming difficult to maintain Fluentd due to the complexity of the code.
I want to keep the code as simple as possible.

Here are the main points.

  • Avoid nil initialization of the cache array.
  • Avoid managing the array index with mutex.
  • Share the logic make_request by using the &block argument.

If you have any concerns or ideas, please let me know.

Comment on lines 106 to 114
@cache = []
@cache_id_mutex = Mutex.new
@cache_entry = Struct.new(:uri, :conn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we can manage these caches more simply.

Suggested change
@cache = []
@cache_id_mutex = Mutex.new
@cache_entry = Struct.new(:uri, :conn)
@cache_by_thread = {}

Why do you define the struct in the constructor?
It would be normal to define it directly under the class.

class HTTPOutput < Output
  Fluent::Plugin.register_output('http', self)

  class RetryableResponse < StandardError; end

  CacheEntry = Struct.new(:uri, :conn)
  ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you define the struct in the constructor?
It would be normal to define it directly under the class.

I fixed this in d57402e

Comment on lines 114 to 121
# Close all open connections
@cache.each {|entry| entry.conn.finish if entry.conn&.started? }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Close all open connections
@cache.each {|entry| entry.conn.finish if entry.conn&.started? }
@cache_by_thread.each_value do |cache|
cache.conn.finish if cache.conn.started?
end

Comment on lines 121 to 129
@cache = Array.new(actual_flush_thread_count, @cache_entry.new("", nil)) if @reuse_connections
@cache_id = 0

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@cache = Array.new(actual_flush_thread_count, @cache_entry.new("", nil)) if @reuse_connections
@cache_id = 0

We should not use the struct with nil or empty values if possible.
It makes code management difficult.

lib/fluent/plugin/out_http.rb Outdated Show resolved Hide resolved
lib/fluent/plugin/out_http.rb Outdated Show resolved Hide resolved
Comment on lines 263 to 341
def make_request_cached(uri, req)
id = Thread.current.thread_variable_get(plugin_id)
if id.nil?
@cache_id_mutex.synchronize {
id = @cache_id
@cache_id += 1
}
Thread.current.thread_variable_set(plugin_id, id)
end
uri_str = uri.to_s
if @cache[id].uri != uri_str
@cache[id].conn.finish if @cache[id].conn&.started?
http = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt)
else
Net::HTTP.start(uri.host, uri.port, @http_opt)
end
@cache[id] = @cache_entry.new(uri_str, http)
end
@cache[id].conn.request(req)
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the overall change, but I believe we can use more simple logic like this.

Suggested change
def make_request_cached(uri, req)
id = Thread.current.thread_variable_get(plugin_id)
if id.nil?
@cache_id_mutex.synchronize {
id = @cache_id
@cache_id += 1
}
Thread.current.thread_variable_set(plugin_id, id)
end
uri_str = uri.to_s
if @cache[id].uri != uri_str
@cache[id].conn.finish if @cache[id].conn&.started?
http = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt)
else
Net::HTTP.start(uri.host, uri.port, @http_opt)
end
@cache[id] = @cache_entry.new(uri_str, http)
end
@cache[id].conn.request(req)
end
def make_request_cached(uri, req)
thread_name = Thread.current.name
@cache_by_thread[thread_name] ||= CacheEntry.new(uri.to_s, make_request(uri, req))
cache = @cache_by_thread[thread_name]
unless cache.uri == uri.to_s
cache.conn.finish if cache.conn.started?
@cache_by_thread[thread_name] = CacheEntry.new(uri.to_s, make_request(uri, req))
cache = @cache_by_thread[thread_name]
end
cache.conn.request(req)
end

Note: we can use the thread name flush_thread_{i} for the key.

@buffer_config.flush_thread_count.times do |i|
thread_title = "flush_thread_#{i}".to_sym
thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new)
thread = thread_create(thread_title) do
flush_thread_run(thread_state)
end

@Garfield96
Copy link
Contributor Author

Hi @daipom,

Thanks for the review. I agree that using a hash makes the code simpler. Unfortunately, hashes are not thread-safe in Ruby and inserting the CacheEntries could result in data-races. Therefore, we would again need a mutex. The array solution had the advantage, that the mutex was only used during the first request of a thread to retrieve the index. Afterwards, the code was lock-free.

@daipom
Copy link
Contributor

daipom commented Feb 5, 2024

@Garfield96
Thanks for considering!

Unfortunately, hashes are not thread-safe in Ruby and inserting the CacheEntries could result in data-races.

I think it's basically thread-safe, although some of the features are not thread-safe (e.g. default_proc (https://bugs.ruby-lang.org/issues/19237)).

||= in my suggestion seems to be not thread-safe, but Thread.current.name is a unique key to the thread.

So, I don't expect my suggestion to cause data races.
Have you actually checked the data race?

Signed-off-by: Christian Norbert Menges <[email protected]>
Signed-off-by: Christian Norbert Menges <[email protected]>
Signed-off-by: Christian Norbert Menges <[email protected]>
@daipom daipom force-pushed the out-http-reuse-connections branch from 7bb7553 to fdf8914 Compare April 30, 2024 03:22
Signed-off-by: Daijiro Fukuda <[email protected]>
Co-authored-by: Daijiro Fukuda <[email protected]>
@daipom daipom force-pushed the out-http-reuse-connections branch from fdf8914 to af18531 Compare April 30, 2024 03:23
@daipom
Copy link
Contributor

daipom commented Apr 30, 2024

Rebased to the latest master, and add my DCO.

@daipom
Copy link
Contributor

daipom commented Apr 30, 2024

I have added only the following additional fixes.

  • Move the cache struct definition outside of the constructor.
  • Make some parameter names more specific.

I have not added commits about the logic simplification that I'm suggesting because we don't have a consensus in the discussion about data race concerns (#4330 (comment)).

Copy link
Contributor

@daipom daipom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, although there are still some discussions about code simplification.

@@ -302,16 +318,41 @@ def create_request(chunk, uri)
req
end

def make_request_cached(uri, req)
id = Thread.current.thread_variable_get(plugin_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not appropriate to use plugin_id as a key, it should be used with a suffix like "#{plugin_id}_connection_cache_id".

In addition, we prefer to use Thread#[] instead of Thread#thread_variable_get if there is no particular reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I agree.
Done in 9ce635c.

and following:

* Use `Thread#[]` style
    * because it is the common for Fluentd code
* Rename `connection_cache_id` to `connection_cache_next_id`
    * for clarity.

Signed-off-by: Daijiro Fukuda <[email protected]>
Copy link
Member

@ashie ashie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ashie
Copy link
Member

ashie commented Apr 30, 2024

Thanks!

@ashie ashie merged commit 8804a80 into fluent:master Apr 30, 2024
15 of 16 checks passed
@daipom
Copy link
Contributor

daipom commented Apr 30, 2024

Thanks so much @Garfield96 !
Sorry for taking a long time. 😢

daipom added a commit to fluent/fluentd-docs-gitbook that referenced this pull request May 15, 2024
New feature of Fluentd v1.17.0.
Related: fluent/fluentd#4330

Signed-off-by: Daijiro Fukuda <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Feature request or improve operations
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants